package com.itcast.hadoop.llyy.topkurl;/**
 * Created by Administrator on 2019/4/19 0019.
 */

import com.itcast.hadoop.flowsum.FlowBean;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

import java.io.IOException;
import java.util.Map;
import java.util.Set;
import java.util.TreeMap;

/**
 * @author ydf
 * @com kt
 * @create 2019-04-19 下午 3:36
 **/
public class TopkURLReducer extends Reducer<Text, FlowBean, Text, LongWritable> {
    //TreeMap 根据流量大小进行排序
    private TreeMap<FlowBean,Text> treeMap = new TreeMap<FlowBean,Text>();
    private double globalCount=0;

    //<url,{bean,bean……}>
    @Override
    protected void reduce(Text key, Iterable<FlowBean> values, Context context) throws IOException, InterruptedException {
        Text url = new Text(key.toString()); //避免传入的都是同一个key的引用
        long up_sum = 0;
        long d_sum = 0;
        for (FlowBean bean : values) {
            up_sum += bean.getUp_flow();
            d_sum += bean.getD_flow();
        }
        FlowBean flowBean = new FlowBean("", up_sum, d_sum);
        //每求得一条url的总流量，就累加到全局计数器，等所有的记录处理完成后，globalCount中的值就是全局总流量和
        globalCount += flowBean.getS_flow();
        treeMap.put(flowBean,url);
    }

    //cleanup方法是reducer方法调用完成后 调用该方法
    @Override
    protected void cleanup(Context context) throws IOException, InterruptedException {
        Set<Map.Entry<FlowBean,Text>>entrySet=treeMap.entrySet();
        double tempCount=0;
        for(Map.Entry<FlowBean,Text> ent: entrySet){
            //输出前80%流量
            if(tempCount / globalCount <0.8){
                context.write(ent.getValue(),new LongWritable(ent.getKey().getD_flow()));
                tempCount +=ent.getKey().getS_flow();
            }else {
                return;
            }

        }
    }
}
